package org.zjt.spark.dstream

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * DESC    定时去扫描文件路径下面的文件(新增文件会触发)
  *
  * @author
  * @create 2017-05-16 上午9:42
  **/
object FileDStreamTest extends App{
  val sparkConf = new SparkConf().setAppName("HDFSWordCount").setMaster("local[2]")

  //create the streaming context
  val  ssc = new StreamingContext(sparkConf, Seconds(6))

  //process file when new file be found.
  val lines = ssc.textFileStream("file:///Users/zhangjuntao/IdeaProjects/myproject/test")
  val words = lines.flatMap(_.split(","))
  val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  wordCounts.print()
  ssc.start()
  ssc.awaitTermination()
}
